Golang用のSnowflake DriverでファイルStreamをテーブルステージにアップロードしてみる
Golang用のSnowflake Driverを使い、ファイルを読み込んでテーブルステージにアップロードしてみました。 基本的には以下の公式のドキュメント通りなのですが、いくつか躓いたところもあったので、アップロードできたソースをここに残しておきたいと思います。
前提条件や環境について
Snowflakeのユーザやテーブルは作成済みであるものとします。今回は以下の記事の「users.csv」を「users」テーブルのテーブルステージにアップロードしました。予め作成しておいてください。
Snowflakeのテーブルステージ経由でファイルをロードしてみた
Golangの実行環境は、以下の記事のコンテナとしました。「/share/src」フォルダを作業フォルダとしたので、他の環境で行う場合は適宜読み替えてください。
Amazon Linux 2 の Dockerイメージから開発環境を作り Visual Studio Codeで接続してみる
Golangの実行環境ができたら、以下のコマンドでSnowflakeのDriverをインストールします。
$ go get github.com/snowflakedb/gosnowflake
ソースについて
まず最初にソースを貼り付けておきます。
package main import ( "context" "database/sql" "fmt" "log" "os" sf "github.com/snowflakedb/gosnowflake" // Go Snowflake Driver ) func main() { accountname := os.Getenv("accountname") username := os.Getenv("username") password := os.Getenv("password") database := os.Getenv("database") schema := os.Getenv("schema") table := "users" readFilePath := "/share/src/users.csv" uploadFileName := "upload_users.gz" // 拡張子をつける parallel := 4 dsn := fmt.Sprintf("%s:%s@%s/%s/%s", username, password, accountname, database, schema, ) db, err := sql.Open("snowflake", dsn) if err != nil { log.Fatal(err) } defer db.Close() fileStream, err := os.OpenFile(readFilePath, os.O_RDONLY, os.ModePerm) if err != nil { log.Fatal(err) } defer fileStream.Close() sql := fmt.Sprintf("put file://%s @%%%s auto_compress=true parallel=%v;", uploadFileName, table, parallel) _, err = db.ExecContext(sf.WithFileStream(context.Background(), fileStream), sql) if err != nil { log.Fatal(err) } fmt.Println("upload stream success!!") }
ユーザ名やパスワードなどについては、今回は環境変数から取得するようにしました。14〜18行目で読み込んでいます。こちらについては適切な値を設定してください。
10行目でSnowflake Driverをimportしています。このインポートしたDriverの「WithFileStream()」を呼び出し、引数にファイルのStreamを渡します。(45行目)
また同じく45行目で「put」コマンドをクエリとして(変数sqlに入れて)渡しています。「put」コマンド自体は以下のような構文となります。
> put file://アップロードファイル名 @%テーブル名 auto_compress=true
このアップロードファイル名ですが、「auto_compress=true」で圧縮してアップロードする場合は圧縮後の拡張子をつけるほうがよさそうです。理由としては、アップロードしたファイルを「copy」コマンドでテーブルに取り込もうとすると、拡張子がないと以下のエラーとなったためです。(なお拡張子をつけない場合でもアップロードされたファイルには「.gz」がつきました。)
>copy into users file_format = (type = csv skip_header = 1); 100079 (22000): Invalid data encountered during decompression for file: 'upload_users.gz',compression type used: 'GZIP', cause: 'data error'
このため、21行目でアップロードファイル名には拡張子をつけています。
ソースを実行し、アップロードされたことをsnowsqlなどで「list」コマンドを実行して確認します。
>list @%users; +-----------------+------+----------------------------------+-------------------------------+ | name | size | md5 | last_modified | |-----------------+------+----------------------------------+-------------------------------| | upload_users.gz | 128 | xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx | xxxxxxxxxxxxxxxxxxxxxxxxxxxxx | +-----------------+------+----------------------------------+-------------------------------+
最後に
Snowflake DriverでファイルのStreamをテーブルステージにアップロードするサンプル処理を書いてみました。何かの役に立てば幸いです。